package rocketmqPT

// 普通队列
import (
	"context"
	"fmt"
	"time"

	"go_stu/src/demo2/conf"

	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/consumer"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"github.com/apache/rocketmq-client-go/v2/producer"
)

func Producer() {
	// 消息消费失败重试两次
	newProducer, err := rocketmq.NewProducer(producer.WithNameServer([]string{conf.ROCKETMQBROKER}), producer.WithRetry(2))
	if err != nil {
		panic("创建生产者失败")
	}

	defer func(newProducer rocketmq.Producer) {
		err := newProducer.Shutdown()
		if err != nil {
			panic("关闭producer失败")
		}
	}(newProducer)
	if err != nil {
		panic("生成producer失败")
	}
	if err = newProducer.Start(); err != nil {
		panic("启动producer失败")
	}

	for i := 0; i < 10; i++ {
		sendStr := fmt.Sprintf("第%d条, 消息", i)
		res, err := newProducer.SendSync(context.Background(), primitive.NewMessage("SimpleTopic", []byte(sendStr)))
		if err != nil {
			panic("消息发送失败" + err.Error())
		}
		nowStr := time.Now().Format("2006-01-02 15:04:05")
		fmt.Printf("%s: 消息: %s发送成功 \n", nowStr, res.String())
	}

}

func Consumer() {
	newPushConsumer, err := rocketmq.NewPushConsumer(consumer.WithNameServer([]string{conf.ROCKETMQBROKER}), consumer.WithGroupName("test"))
	if err != nil {
		panic("创建消费者失败")
	}
	defer func(newPushConsumer rocketmq.PushConsumer) {
		err := newPushConsumer.Shutdown()
		if err != nil {
			panic("关闭consumer失败")
		}
	}(newPushConsumer)

	err = newPushConsumer.Subscribe("SimpleTopic", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
		for _, msg := range msgs {
			nowStr := time.Now().Format("2006-01-02 15:04:05")
			fmt.Printf("%s 读取到一条消息,消息内容: %s \n", nowStr, string(msg.Body))
		}
		return consumer.ConsumeSuccess, nil
	})

	if err != nil {
		fmt.Println("读取消息失败")
	}
	if err = newPushConsumer.Start(); err != nil {
		panic("启动consumer失败")
	}
	// 优雅消费者一直开启
	quit := make(chan int)
	<-quit
}
